package kafka

import "github.com/Shopify/sarama"

type PartitionConsumer interface {
	AsyncClose()
	Close() error
	Messages(int) ([]*ConsumerMessage, error)
	HighWaterMarkOffset() int64
}

type partitionConsumer struct {
	partitionConsumer sarama.PartitionConsumer
}

func (p *partitionConsumer) AsyncClose() {
	p.partitionConsumer.AsyncClose()
}

func (p *partitionConsumer) Close() error {
	return p.partitionConsumer.Close()
}

// Messages 获取消息
func (p partitionConsumer) Messages(num int) ([]*ConsumerMessage, error) {
	var msgList []*ConsumerMessage
	for msg := range p.partitionConsumer.Messages() {
		msgList = append(msgList, getMessage(msg))
		num--
		if num <= 0 {
			break
		}
	}
	return msgList, nil
}

func (p partitionConsumer) HighWaterMarkOffset() int64 {
	return p.partitionConsumer.HighWaterMarkOffset()
}
